package hotnet.nio;

import hotnet.buffer.IoBuffer;
import hotnet.common.SessionState;
import hotnet.processor.AbstractPollingIoProcessor;
import hotnet.session.IoSession;

import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NioSocketProcessor extends AbstractPollingIoProcessor {
	
	private static Logger logger = LoggerFactory.getLogger(NioSocketProcessor.class);
	private Selector selector;

	public NioSocketProcessor(Executor executor) {
		super(executor);
		
		try {
			selector = Selector.open();
		} catch(IOException e) {
			throw new RuntimeException("Failed to open a selector.", e);
		}
	}

	@Override
	protected void init(IoSession session) throws Exception {
		SelectableChannel ch = (SelectableChannel) session.getChannel();
		
		ch.configureBlocking(false);
		
		/* interest read event */
		session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
	}

	@Override
	protected void wakeup() {
		wakeupCalled.getAndSet(true);
		selector.wakeup();
	}

	@Override
	protected void registerNewSelector() throws IOException {
		synchronized (selector) {
			Set<SelectionKey> keys = selector.keys();
			
			Selector newSelector = Selector.open();
			
			 for (SelectionKey key : keys) {
	                SelectableChannel ch = key.channel();

	                NioSocketSession session = (NioSocketSession) key.attachment();
	                SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
	                session.setSelectionKey(newKey);
	         }
			 
	         // Now we can close the old selector and switch it
	         selector.close();
	         selector = newSelector;
		}
		
	}

	@Override
	protected int select(long timeout) throws Exception {
		return selector.select(timeout);
	}

	@Override
	protected Iterator<IoSession> allSessions() {
		return new IoSessionIterator<IoSession>(selector.keys());
	}

	@Override
	protected Iterator<IoSession> selectedSessions() {
		return new IoSessionIterator<IoSession>(selector.selectedKeys());
	}

	@Override
	protected boolean isWritable(IoSession session) {
		SelectionKey key = session.getSelectionKey();
		
		if (key == null || !key.isValid())
			return false;
		
		return key.isWritable();
	}

	@Override
	protected boolean isReadable(IoSession session) {
		SelectionKey key = session.getSelectionKey();
		
		if (key == null || !key.isValid()) 
			return false;
		
		return key.isReadable();
	}

	@Override
	protected void setInterestedInWrite(IoSession session, boolean isInterested)
			throws Exception {
		
		SelectionKey key = session.getSelectionKey();
		
		if (key == null || !key.isValid())
			return;
		
		int newInterestOps = key.interestOps();
		if (isInterested) {
            newInterestOps |= SelectionKey.OP_WRITE;
        } else {
            newInterestOps &= ~SelectionKey.OP_WRITE;
        }
		
		key.interestOps(newInterestOps);
	}

	@Override
	protected void setInterestedInRead(IoSession session, boolean isInterested)
			throws Exception {
		SelectionKey key = session.getSelectionKey();
		
		if (key == null || !key.isValid())
			return;
		
		int newInterestOps = key.interestOps();
		if (isInterested) {
            newInterestOps |= SelectionKey.OP_READ;
        } else {
            newInterestOps &= ~SelectionKey.OP_READ;
        }
		
		key.interestOps(newInterestOps);
		
	}

	@Override
	protected boolean isInterestedInRead(IoSession session) {
		SelectionKey key = session.getSelectionKey();
		
		if (key == null || !key.isValid())
			return false;
		
		return ((SelectionKey.OP_READ & key.interestOps()) != 0);
	}

	@Override
	protected boolean isInterestedInWrite(IoSession session) {
		SelectionKey key = session.getSelectionKey();
		
		if (key == null || !key.isValid())
			return false;
		
		return ((SelectionKey.OP_WRITE & key.interestOps()) != 0);
	}

	@Override
	protected int read(IoSession session, IoBuffer buf) throws Exception {
		ByteChannel channel = session.getChannel();
		
		return channel.read(buf.buf());
	}

	@Override
	protected int write(IoSession session, IoBuffer buf, int length)
			throws Exception {
		if (buf.remaining() < length) {
			return session.getChannel().write(buf.buf());
		} 
		
		int oldLimit = buf.limit();
		buf.limit(buf.position() + length);
		
		try {
			return session.getChannel().write(buf.buf());
		} finally {
			buf.limit(oldLimit);
		}
	}

	@Override
	protected boolean isBrokenConnection() throws IOException {
		boolean brokenSession = false;
		
		synchronized (selector) {
			Set<SelectionKey> keys = selector.keys();
			
			for (SelectionKey key : keys) {
				SelectableChannel channel = key.channel();
				
				if ((channel instanceof SocketChannel) && !((SocketChannel)channel).isConnected()) {
					key.cancel();
					
					brokenSession = true;
				}
			}
		}
		
		return brokenSession;
	}

	@Override
	protected void doDispose() throws Exception {
		selector.close();
		
	}

	@Override
	protected boolean isSelectorEmpty() {
		return selector.keys().isEmpty();
	}
	
	@Override
	protected void destroy(IoSession session) throws Exception {
		logger.info("destory session:" + session.getSessionId() + " close channel!");
		
		SocketChannel ch = session.getChannel();
        SelectionKey key = session.getSelectionKey();
        
        if (key != null) {
            key.cancel();
        }
        
        ch.close();
	}

	@Override
	protected SessionState getState(IoSession session) {
        SelectionKey key = session.getSelectionKey();

        if (key == null) {
            // The channel is not yet registred to a selector
            return SessionState.OPENING;
        }

        if (key.isValid()) {
            // The session is opened
            return SessionState.OPENED;
        } else {
            // The session still as to be closed
            return SessionState.CLOSING;
        }
	}
	
	protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
		private final Iterator<SelectionKey> iterator;
		
		private IoSessionIterator(Set<SelectionKey> keys) {
            iterator = keys.iterator();
        }

		@Override
		public boolean hasNext() {
			return iterator.hasNext();
		}

		@Override
		public NioSession next() {
			SelectionKey key = iterator.next();
			NioSession session = (NioSession)key.attachment();
			
			return session;
		}

		@Override
		public void remove() {
			iterator.remove();
		}
	}

}
